package cn.edu360.beans

import java.util.Properties

import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{DataFrame, SQLContext}
import org.apache.spark.{SparkConf, SparkContext}

object GeoDist {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf()
      .setMaster("local[*]")
      .setAppName(this.getClass.getSimpleName)

    val url = "jdbc:mysql://192.168.254.5:3306/test?characterEncoding=utf-8"
    val table = "dmpwrite1"
    val props = new Properties()
    props.setProperty("user","root")
    props.setProperty("password","327652")
    props.setProperty("driver","com.mysql.jdbc.Driver")
    val sc = new SparkContext(conf)
    val sqlContext = new SQLContext(sc)
    val parquet: DataFrame = sqlContext.read.parquet("d:/ParquetFile")
    parquet.map(t => {
      val provincename: String = t.getString(24)
      val cityname: String = t.getString(25)
      //requestmode: Int, 	数据请求方式（1:请求、2:展示、3:点击）
      val requestmode: Int = t.getInt(8)
      //processnode: Int, 	流程节点（1：请求量kpi 2：有效请求 3：广告请求）
      val processnode: Int = t.getInt(35)
      //iseffective: Int, 	有效标识（有效指可以正常计费的）(0：无效 1：有效
      val iseffective: Int = t.getInt(30)
      //isbilling: Int, 	是否收费（0：未收费 1：已收费）
      val isbilling: Int = t.getInt(31)
      //isbid: Int, 	是否rtb
      val isbid: Int = t.getInt(39)
      //iswin: Int, 	是否竞价成功
      val iswin: Int = t.getInt(42)
      //adorderid: Int, 	广告id
      val adorderid: Int = t.getInt(2)
      //winprice: Double, 	rtb竞价成功价格
      val successBidPrice: Double = t.getDouble(41)
      //adpayment: Double, 	转换后的广告消费
      val adpayment: Double = t.getDouble(78)

      val startRequest: Int = if (requestmode == 1 && processnode >= 1) 1 else 0
      val effectRequest: Int = if (requestmode == 1 && processnode >= 2) 1 else 0
      val adRequest: Int = if (requestmode == 1 && processnode == 3) 1 else 0
      val participateBid: Int = if (iseffective == 1 && isbilling == 1 && isbid == 1 && adorderid != 0) 1 else 0
      val successBid: Int = if (iseffective == 1 && isbilling == 1 && iswin == 1) 1 else 0
      val showNum: Int = if (requestmode == 2 && iseffective == 1) 1 else 0
      val clickNum: Int = if (requestmode == 3 && iseffective == 1) 1 else 0
      val DSPAdConsu: Double = if (iseffective == 1 && isbilling == 1 && isbid == 1) successBidPrice / 1000 else 0
      val DSPAdCost: Double = if (iseffective == 1 && isbilling == 1 && isbid == 1) adpayment / 1000 else 0
      ((provincename, cityname), List(startRequest, effectRequest, adRequest, participateBid, successBid, showNum, clickNum, DSPAdConsu, DSPAdCost))
    }).groupBy(_._1).map(t =>{
      t._2.map(t => t._2)
    })

//    val shengfenfenbu: RDD[((String, String), Iterable[Double])] = message.groupBy(t => {
//      (t._1._1, t._1._2)
//    }).map(t => {
//      val list: Iterable[List[Double]] = t._2.map(m => {
//        m._2
//      })
//      val listNew: List[Double] = list.reduce((list1, list2) => {
//        list1.zip(list2).map(t => t._1 + t._2)
//      })
//      ((t._1._1, t._1._2), listNew)
//    })

    sc.stop()
  }
}
